Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Active Message receiver callbacks #186

Merged
merged 14 commits into from
Mar 15, 2024

Conversation

pentschev
Copy link
Member

@pentschev pentschev commented Feb 6, 2024

Adds a new pattern to receive Active Messages where the receiving endpoint registers a callback and the sender informs receiver of the callback to use, the receiver then executes that callback and does not require an explicit ep->amRecv() call.

Sample code:

// Define AM receiver callback's owner and id for callback
ucxx::AmReceiverCallbackInfo receiverCallbackInfo("TestApp", 0);

// Define AM receiver callback and register with worker
std::shared_ptr<ucxx::Request> receivedRequest = nullptr;
auto callback = ucxx::AmReceiverCallbackType(
  [this, &receivedRequest](std::shared_ptr<ucxx::Request> req) {
    {
      receivedRequest = req;
    }
  });
worker->registerAmReceiverCallback(receiverCallbackInfo, callback);

// Submit and wait for transfers to complete
std::shared_ptr<ucxx::Request> sendRequest;
requests.push_back(ep->amSend(buffer, size, memoryType, receiverCallbackInfo));
while (!sendRequest->isCompleted())
  worker->progress()

while (receivedRequest == nullptr)
  worker->progress();

This PR only implements C++ parts, #187 tracks the missing Python implementation.

@pentschev pentschev requested review from a team as code owners February 6, 2024 23:03
@pentschev pentschev added breaking Introduces a breaking change feature request New feature or request labels Feb 6, 2024
@pentschev
Copy link
Member Author

@mdemoret-nv FYI

Any non-const container that may have an external reference (such as a
`std::string_view`) may change its underlying content and invalidate
hashing, thus we must ensure the value can't change once registered.
Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, I think

cpp/include/ucxx/worker.h Show resolved Hide resolved
Comment on lines 35 to 41
if (_receiverCallback) {
_request->_callback = [this](ucs_status_t, std::shared_ptr<void>) {
_receiverCallback(_request);
};
}

_request->callback(request, status);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make more sense to do this rewriting on the _request in the constructor, rather than the callback? That way you wouldn't even need the _receiverCallback slot I think

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 1542744

Comment on lines 76 to 78

memcpy(serialized.data() + offset, &memoryType, sizeof(memoryType));
offset += sizeof(memoryType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor polish, perhaps unnecessary:

auto encode = [&offset, &serialized](void const *data, size_t bytes) {
  memcpy(serialized.data() + offset, data, bytes);
  offset += bytes;
}

And then:

encode(&memoryType, sizeof(memoryType));
encode(&hasReceiverCallback, sizeof(hasReceiverCallback));
if (hasReceiveCallback) {
  encode(&ownerSize, sizeof(ownerSize));
  encode(receiverCallbackInfo->owner.c_str(), ownerSize);
  encode(&receiverCallbackInfo->id, sizeof(receiverCallbackInfo->id));
}

And something matching on the decode side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an excellent idea! Done in 1920ff1 .

auto allocatorType = *static_cast<const ucs_memory_type_t*>(header);
// auto amHeader = AmHeader::deserialize(static_cast<const char*>(header));
auto amHeader =
AmHeader::deserialize(std::string(static_cast<const char*>(header), header_length));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This copies the header, suggestion:

Suggested change
AmHeader::deserialize(std::string(static_cast<const char*>(header), header_length));
AmHeader::deserialize(std::string_view(static_cast<const char*>(header), header_length));

And accept a std::string_view in deserialize.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 4304378 .

@pentschev
Copy link
Member Author

Thanks @wence- for reviewing!

@pentschev
Copy link
Member Author

/merge

@rapids-bot rapids-bot bot merged commit d62dff4 into rapidsai:branch-0.37 Mar 15, 2024
57 checks passed
@pentschev pentschev deleted the am-receiver-callback branch April 11, 2024 11:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking Introduces a breaking change feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants